package com.future.bs.component.stream;

import com.future.bs.component.business.Business;
import com.future.bs.context.Context;
import com.future.bs.component.report.ParalleledReport;
import com.future.bs.component.report.Report;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * <p>
 * 抽象并行执行业务流，应用于校验类业务流
 * </p>
 *
 * @author Jingjie
 * @since 2020-11-08 09:58
 */
public class ParalleledStream<T extends Context> extends Stream<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ParalleledStream.class);

    private List<Business<T>> businesses;
    private ExecutorService executorService;

    @Override
    public ParalleledReport<T> execute(T context) {
        // 并发执行
        List<Future<Report<T>>> futures = parallelExecute(context);
        // 获取结果
        List<Report<T>> reports = getReports(futures);
        return new ParalleledReport<>(reports);
    }

    /**
     * 并发执行
     *
     * @param context 业务上下文
     * @return 业务并发future
     */
    private List<Future<Report<T>>> parallelExecute(T context) {
        List<Callable<Report<T>>> tasks = new ArrayList<>(this.businesses.size());
        for (Business<T> business : this.businesses) {
            tasks.add(() -> business.execute(context));
        }
        List<Future<Report<T>>> futures;
        try {
            futures = this.executorService.invokeAll(tasks);
        } catch (InterruptedException e) {
            LOGGER.error("The parallel stream was interrupted while executing business.", e);
            throw new RuntimeException("The parallel stream was interrupted while executing business.", e);
        }
        return futures;
    }

    /**
     * 获取各个并发业务执行结果，默认等待30秒，超时抛出TimeoutException
     *
     * @param futures 业务并发future
     * @return 业务执行结果集合
     */
    private List<Report<T>> getReports(List<Future<Report<T>>> futures) {
        List<Report<T>> reports = new ArrayList<>(futures.size());
        String name = getName();
        for (Future<Report<T>> future : futures) {
            try {
                Report<T> report = future.get(30, TimeUnit.SECONDS);
                reports.add(report);
            } catch (InterruptedException e) {
                String message = String
                        .format("The parallel stream '%s' was interrupted while waiting for business result.", name);
                LOGGER.error(message, e);
                throw new RuntimeException(message, e);
            } catch (TimeoutException e) {
                String message = String
                        .format("The parallel stream '%s' was time out while waiting for business result.", name);
                LOGGER.error(message, e);
                throw new RuntimeException(message, e);
            } catch (ExecutionException e) {
                String message = String.format("Unable to execute parallel stream '%s'.", name);
                LOGGER.error(message, e);
                throw new RuntimeException(message, e);
            }
        }
        return reports;
    }

    /**
     * <p>
     * 建造器
     * </p>
     *
     * @author Jingjie
     * @since 2020-11-08 09:55
     */
    public class Builder {
        private String name;
        private List<Business<T>> businesses = new ArrayList<>();
        private ExecutorService executorService;

        public Builder name(String name) {
            this.name = name;
            return this;
        }

        public Builder process(Business<T> business) {
            this.businesses.add(business);
            return this;
        }

        public Builder process(List<Business<T>> businesses) {
            this.businesses.addAll(businesses);
            return this;
        }

        public Builder parallel(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        public ParalleledStream<T> build() {
            ParalleledStream.this.name = this.name;
            ParalleledStream.this.businesses = this.businesses;
            ParalleledStream.this.executorService = this.executorService;
            return ParalleledStream.this;
        }
    }
}
